# coding:utf-8

import json
from old_version.src.entity.Stock import Stock
from old_version.src import StockRecord
from old_version.src import StockIndex
from TimeUtil import TimeUtil
from thrift.transport import TSocket
from hbase import Hbase
from hbase.ttypes import *
# from src.service.StockRecordService import StockRecordService
from RedisUtil import RedisUtil

'''
Hbase工具类
'''


class HbaseUtil:

    def __init__(self):
        pass

    # thrift默认端口是9090
    socket = TSocket.TSocket('192.168.23.135', 9090)
    socket.setTimeout(50000)
    transport = TTransport.TBufferedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = Hbase.Client(protocol)


    '''
    返回所有Stock对象
    '''
    @staticmethod
    def get_all_stock():
        HbaseUtil.socket.open()
        stock_list = list()
        scanner = HbaseUtil.client.scannerOpen("stock", "", [])
        r = HbaseUtil.client.scannerGet(scanner)
        while r:
            stock = Stock()
            stock.id = r[0].row
            stock.code = r[0].columns.get("info:code").value
            stock_list.append(stock)
            r = HbaseUtil.client.scannerGet(scanner)
        HbaseUtil.client.scannerClose(scanner)
        HbaseUtil.socket.close()
        return stock_list

    '''
    返回所有StockIndex对象
    '''
    @staticmethod
    def get_all_stock_index():
        HbaseUtil.socket.open()
        stock_index_list = list()
        scanner = HbaseUtil.client.scannerOpen("stock_index", "", [])
        r = HbaseUtil.client.scannerGet(scanner)
        while r:
            stock_index = StockIndex()
            stock_index.id = r[0].row
            stock_index.code = r[0].columns.get("info:code").value
            stock_index_list.append(stock_index)
            r = HbaseUtil.client.scannerGet(scanner)
        HbaseUtil.client.scannerClose(scanner)
        HbaseUtil.socket.close()
        return stock_index_list

    '''
    向Hbase中批量插入StockRecord对象
    '''
    @staticmethod
    def put_stock_record_object_list():
        print(TimeUtil.date(), "put_stock_record_object_list method start")
        # 返回所有key，每个key是Stock的code属性
        all_key_list = RedisUtil.redis_conn.keys("StockRecord_*")
        # list类型，用于存储BatchMutation对象。用于存储所有股票的所有StockRecord记录。
        batch_mutation_list = list()
        HbaseUtil.socket.open()
        for key in all_key_list:
            # 返回某一只股票的所有StockRecord记录，并按date属性升序排列
            stock_record_list = RedisUtil.redis_conn.zrange(name=key, start=0, end=-1, withscores=True)
            if stock_record_list is not None and len(stock_record_list) > 0:
                # 将StockRecord对象插入到Hbase中
                for stock_record_tuple in stock_record_list:
                    stock_record_build = json.loads(stock_record_tuple[0])
                    stock_record = StockRecord()
                    stock_record.__dict__ = stock_record_build
                    # list类型，每一个元素是一个Mutation对象，其中存储列族、列和值。用于存储一只股票的一条StockRecord记录。
                    mutation_list = list()
                    mutation_list.append(Mutation(column="info:id", value=stock_record.id))
                    mutation_list.append(Mutation(column="info:date", value=stock_record.date.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:code", value=stock_record.code.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:open", value=stock_record.open.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:high", value=stock_record.high.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:close", value=stock_record.close.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:low", value=stock_record.low.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:last_close", value=stock_record.last_close.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:up_down", value=stock_record.up_down.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:up_down_amount", value=stock_record.up_down_amount.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:up_down_percentage", value=stock_record.up_down_percentage.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:turnover_rate", value=stock_record.turnover_rate.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:volume", value=stock_record.volume.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:amount", value=stock_record.amount.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:total_market_value", value=stock_record.total_market_value.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:circulation_market_value", value=stock_record.circulation_market_value.encode("utf-8")))
                    mutation_list.append(Mutation(column="info:five", value=stock_record.five.encode("utf-8") if stock_record.five is not None else str("")))
                    mutation_list.append(Mutation(column="info:ten", value=stock_record.ten.encode("utf-8") if stock_record.ten is not None else str("")))
                    mutation_list.append(Mutation(column="info:twenty", value=stock_record.twenty.encode("utf-8") if stock_record.twenty is not None else str("")))
                    mutation_list.append(Mutation(column="info:sixty", value=stock_record.sixty.encode("utf-8") if stock_record.sixty is not None else str("")))
                    mutation_list.append(Mutation(column="info:one_hundred_and_twenty", value=stock_record.one_hundred_and_twenty.encode("utf-8") if stock_record.one_hundred_and_twenty is not None else str("")))
                    mutation_list.append(Mutation(column="info:two_hundred_forty", value=stock_record.two_hundred_and_forty.encode("utf-8") if stock_record.two_hundred_and_forty is not None else str("")))
                    mutation_list.append(Mutation(column="info:ema12", value=stock_record.ema12.encode("utf-8") if stock_record.ema12 is not None else str("")))
                    mutation_list.append(Mutation(column="info:ema26", value=stock_record.ema26.encode("utf-8") if stock_record.ema26 is not None else str("")))
                    mutation_list.append(Mutation(column="info:dif", value=stock_record.dif.encode("utf-8") if stock_record.dif is not None else str("")))
                    mutation_list.append(Mutation(column="info:dea", value=stock_record.dea.encode("utf-8") if stock_record.dea is not None else str("")))
                    # BatchMutation类型，用于存储行键，所有的存储列族、列和值。表示一只股票的一条StockRecord记录。
                    batch_mutation = BatchMutation(stock_record.date.encode("utf-8")+"_"+stock_record.code.encode("utf-8"), mutation_list)
                    batch_mutation_list.append(batch_mutation)
        # 插入所有股票的所有StockRecord记录。
        # 此处需要一次就所有记录都插入，如果每只股票都插入一次的话，会抛出下面异常：
        # hbase.ttypes.IOError: IOError(message='org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 22 actions: IOException: 22 times, \n\tat org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:228)\n\tat org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:208)\n\tat org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1689)\n\tat org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:208)\n\tat org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:183)\n\tat org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1439)\n\tat org.apache.hadoop.hbase.client.HTable.put(HTable.java:1042)\n\tat org.apache.hadoop.hbase.thrift.ThriftServerRunner$HBaseHandler.mutateRowsTs(ThriftServerRunner.java:1329)\n\tat org.apache.hadoop.hbase.thrift.ThriftServerRunner$HBaseHandler.mutateRows(ThriftServerRunner.java:1275)\n\tat sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.hadoop.hbase.thrift.HbaseHandlerMetricsProxy.invoke(HbaseHandlerMetricsProxy.java:67)\n\tat com.sun.proxy.$Proxy10.mutateRows(Unknown Source)\n\tat org.apache.hadoop.hbase.thrift.generated.Hbase$Processor$mutateRows.getResult(Hbase.java:4386)\n\tat org.apache.hadoop.hbase.thrift.generated.Hbase$Processor$mutateRows.getResult(Hbase.java:4370)\n\tat org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)\n\tat org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)\n\tat org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer$ClientConnnection.run(TBoundedThreadPoolServer.java:289)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n')
        try:
            HbaseUtil.client.mutateRows("stock_record", batch_mutation_list)
        except Exception, e:
            # 使用python链接hbase thrift，在所有记录都插入后会抛出超时异常，因此在此处捕获
            HbaseUtil.socket.close()
        HbaseUtil.socket.close()


if __name__ == "__main__":
    stock_record_service = StockRecordService()
    stock_record_service.update()
    stock_record_object_list = stock_record_service.get_stock_record_object_list()
    HbaseUtil.put_stock_record_object_list(stock_record_object_list)

    print(TimeUtil.date())
